package com.itzx.networkflow_analysis

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

// 输入数据样例类
case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)

// 窗口聚合结果样例类
case class UrlViewCount(url: String, windowEnd: Long, count: Long)

/**
 * 基于服务器 log 的热门页面浏览量统计
 *
 * author: yyeleven
 * create: 2020/3/22 11:21
 */
object NetworkFlow {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // 读物数据
    val sourceStream: DataStream[String] = env.readTextFile("E:\\workroom_own\\UserBehaviorAnalysis\\NetwarkFlowAnalysis\\src\\main\\resources\\apache.log")

    val dataStream = sourceStream.map(data => {
      val dataArray: Array[String] = data.split("\\s+")

      //定义时间转换
      val simpleDateFormat: SimpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
      val timeStamp: Long = simpleDateFormat.parse(dataArray(3).trim).getTime

      ApacheLogEvent(dataArray(0).trim, dataArray(1).trim, timeStamp, dataArray(5), dataArray(6).trim)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(1)) {
      override def extractTimestamp(t: ApacheLogEvent): Long = t.eventTime
    }).filter(data => {
      val pattern = "^((?!\\.(css|js)$).)*$".r
      (pattern findFirstIn data.url).nonEmpty
    })
      .keyBy(_.url)
        .timeWindow(Time.minutes(10), Time.seconds(5))
        .allowedLateness(Time.seconds(60))
        .aggregate(new CountAgg(), new WindowResult())
        .keyBy(_.windowEnd)
        .process(new TopNHotUrls(5))

    dataStream.print()

    env.execute()
  }

  class CountAgg() extends AggregateFunction[ApacheLogEvent, Long, Long] {
    override def createAccumulator(): Long = 0L

    override def add(in: ApacheLogEvent, acc: Long): Long = acc + 1

    override def getResult(acc: Long): Long = acc

    override def merge(acc: Long, acc1: Long): Long = acc + acc1
  }

  // 自定义窗口处理函数
  class WindowResult() extends WindowFunction[Long, UrlViewCount, String, TimeWindow] {
    override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
      out.collect(UrlViewCount(key, window.getEnd, input.iterator.next()))
    }
  }

}

// 自定义根据窗口,排序输出处理函数
class TopNHotUrls(topSize: Int) extends KeyedProcessFunction[Long, UrlViewCount, String] {

  lazy val urlState:  ListState[UrlViewCount] = getRuntimeContext.getListState(new ListStateDescriptor[UrlViewCount]("url-state", classOf[UrlViewCount]))

  override def processElement(value: UrlViewCount, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#Context, out: Collector[String]): Unit = {
    urlState.add(value)
    ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
    // 从状态中拿到所有函数
    val allUrlViews: ListBuffer[UrlViewCount] = new ListBuffer[UrlViewCount]()

    val iter: util.Iterator[UrlViewCount] = urlState.get().iterator()

    while (iter.hasNext) {
      allUrlViews += iter.next()
    }
    urlState.clear()

    // 从高到低排序, 降序排列
    val sortedUrlViews: ListBuffer[UrlViewCount] = allUrlViews.sortWith(_.count > _.count).take(topSize)

    // 格式化结果输出
    val result: StringBuilder = new StringBuilder()

    result.append("时间: ").append(new Timestamp(timestamp - 1)).append("\n")
    for (i <- sortedUrlViews.indices) {
      val currentUrlView: UrlViewCount = sortedUrlViews(i)
      result.append("NO").append(i + 1).append(":")
        .append(" URL= ").append(currentUrlView.url)
        .append(" 访问量=").append(currentUrlView.count)
        .append("\n")
    }

    result.append("=============================")
    Thread.sleep(1000)
    out.collect(result.toString())
  }
}
